package com.cantor.core.center.impl;

import cn.hutool.core.util.StrUtil;
import com.cantor.core.center.RegistrationCenter;
import com.google.common.base.Joiner;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryForever;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * Zookeeper注册中心实现
 * 可以理解为是根据接口规范的统一工具类
 * 注意: 传路径的时候千万不要带有"/" , 方法里都封装好了!
 */
@Slf4j
@Accessors(chain = true)
public class ZooKeeperRegistrationCenter implements RegistrationCenter<CuratorFramework> {

    // ip:port
    @Getter
    @Setter
    private String address = "127.0.0.1:2181";

    // 不要set这个, 由本类自己初始化
    private CuratorFramework zkClient;

    // 全局命名空间, 不要有"/"
    @Getter
    @Setter
    private String namespace = "cantor";

    // 编码
    @Getter
    @Setter
    private Charset encoding = Charset.forName("UTF-8");

    // 分布式id的节点名, 一定要有"/"开头, 且不包括namespace
    @Getter
    @Setter
    private String sequenceIdZNode = "/seq";

    // 不要设置这个, 由本类负责初始化
    private DistributedAtomicLong distributedAtomicLong;

    // 初始化连接, 启动客户端
    @Override
    public void run() {
        if(null != zkClient && zkClient.isStarted()){
            return;
        }
        // 构建CuratorFramework
        zkClient = CuratorFrameworkFactory.builder()
                .connectString(address)
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(new ExponentialBackoffRetry(3000, 10))
                .namespace(namespace) // 不用每次都加app名前缀
                .build();
        // 准备分布式ID生成器, 这里采用每隔一段时间无限重试策略,符合其内部自旋锁机制.
        distributedAtomicLong = new DistributedAtomicLong(zkClient, sequenceIdZNode, new RetryForever(5));
        // 启动ZooKeeper客户端
        zkClient.start();
        try {
            // 5秒内没有连接成功, 就报错
            zkClient.blockUntilConnected(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
            log.error("连接远程zookeeper失败");
        }
    }

    // 获取zookeeper客户端
    @Override
    public CuratorFramework getClient() {
        return zkClient;
    }

    // 创建临时节点
    @Override
    public boolean createNodeE(String path) {
        return createNodeE(path, null);
    }

    // 创建带数据临时节点
    @Override
    public boolean createNodeE(String path, String data) {
        try {
            ACLBackgroundPathAndBytesable<String> acl = zkClient.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL);
            String donePath;
            if (StrUtil.isNotEmpty(data)) {
                donePath = acl.forPath(path, data.getBytes(encoding));
            } else {
                donePath = acl.forPath(path);
            }
            return StrUtil.isNotEmpty(donePath);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("注册中心实现类创建临时节点失败");
            return false;
        }
    }

    // 创建持久节点
    @Override
    public boolean createNode(String path) {
        return createNode(path, null);
    }

    @Override
    public boolean createNode(String path, String data) {
        try {
            ACLBackgroundPathAndBytesable<String> acl = zkClient.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT);
            String donePath;
            if (StrUtil.isNotEmpty(data)) {
                donePath = acl.forPath(path, data.getBytes());
            } else {
                donePath = acl.forPath(path);
            }
            return StrUtil.isNotEmpty(donePath);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("注册中心实现类创建持久节点失败");
            return false;
        }
    }

    // 删除节点
    @Override
    public void deleteNode(String... path) {
        try {
            String multiPathStr = "/" + Joiner.on("/").join(path);
            zkClient.delete()
                    .guaranteed()
                    .deletingChildrenIfNeeded()
                    .forPath(multiPathStr);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("注册中心实现类删除节点失败");
        }
    }

    // 获取某节点数据
    @Override
    public String getData(String... path) {
        try {
            String multiPathStr = "/" + Joiner.on("/").join(path);
            byte[] bytes = zkClient.getData()
                    .forPath(multiPathStr);
            return new String(bytes, encoding);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("注册中心实现类获取节点数据失败");
            return null;
        }
    }

    // 为某一节点设置数据
    @Override
    public boolean setData(String data, String... path) {
        try {
            String multiPathStr = "/" + Joiner.on("/").join(path);
            // 先查到当前version
            Stat stat1 = new Stat();
            zkClient.getData()
                    .storingStatIn(stat1)
                    .forPath(multiPathStr);
            // 在根据这个version进行修改,如果version不对, 修改失败
            Stat stat2 = zkClient.setData()
                    .withVersion(stat1.getVersion())
                    .forPath(multiPathStr, data.getBytes(encoding));
            // 如果修改后的数据版本等于修改前版本+1,则认为修改成功
            return stat2.getVersion() - stat1.getVersion() == 1;
        } catch (Exception e) {
            e.printStackTrace();
            log.error("注册中心实现类修改节点数据失败");
            return false;
        }
    }

    // 获取所有子节点
    @Override
    public List<String> getChildren(String... path) {
        try {
            String multiPathStr = "/" + Joiner.on("/").join(path);
            return zkClient.getChildren()
                    .forPath(multiPathStr);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("注册中心实现类查询子节点失败");
            return null;
        }
    }

    // 关闭连接
    @Override
    public boolean close() {
        try {
            if (null != zkClient) {
                zkClient.close();
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            log.error("注册中心实现类关闭与zk的连接失败");
            return false;
        }
    }

    @Override
    public boolean createMultiNode(String... paths) {
        return createMultiNodeWithData(null, paths);
    }

    @Override
    public boolean createMultiNodeWithData(String data, String... paths) {
        try {
            String multiPathStr = "/" + Joiner.on("/").join(paths);
            ACLBackgroundPathAndBytesable<String> acl = zkClient.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT);
            String donePath;
            if (StrUtil.isNotEmpty(data)) {
                donePath = acl.forPath(multiPathStr, data.getBytes(encoding));
            } else {
                donePath = acl.forPath(multiPathStr);
            }
            return StrUtil.isNotEmpty(donePath);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("注册中心实现类创建多级节点失败");
            return false;
        }
    }

    // 创建多级临时节点(自动拼接"/")
    @Override
    public boolean createMultiNodeE(String... paths) {
        return createMultiNodeEWithData(null, paths);
    }

    // 创建带数据的多级临时节点(自动拼接"/")
    @Override
    public boolean createMultiNodeEWithData(String data, String... paths) {
        try {
            String multiPathStr = "/" + Joiner.on("/").join(paths);
            ACLBackgroundPathAndBytesable<String> acl = zkClient.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL);
            String donePath;
            if (StrUtil.isNotEmpty(data)) {
                donePath = acl.forPath(multiPathStr, data.getBytes(encoding));
            } else {
                donePath = acl.forPath(multiPathStr);
            }
            return StrUtil.isNotEmpty(donePath);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("注册中心实现类创建多级临时节点失败");
            return false;
        }
    }

    // 判断节点是否存在(path不要带"/")
    @Override
    public boolean exists(String... paths) {
        String multiPathStr = null;
        try {
            multiPathStr = "/" + Joiner.on("/").join(paths);
            return zkClient.checkExists().forPath(multiPathStr) != null;
        } catch (Exception e) {
            e.printStackTrace();
            log.error("判断zookeeper节点是否存在时出现异常, nodePaht:{}" + multiPathStr);
            return false;
        }
    }

    // 分布式id
    @Override
    public long getSequenceId() throws Exception {
        AtomicValue<Long> atomicValue = distributedAtomicLong.increment();
        if (!atomicValue.succeeded()) {
            throw new RuntimeException();
        }
        return atomicValue.postValue();
    }
}
